package redis_stream

import (
	"encoding/json"
	"errors"
	"fmt"
	"github.com/go-redis/redis"
	"strconv"
	"sync"
	"time"
)

type streamD struct {
	group *ConsumerGroup
}

func (s streamD) Run(group *ConsumerGroup) (err error) {
	s.group = group

	for {
		var payloads chan *Payload
		if payloads, err = s.read(); err != nil {
			break
		}

		s.callback(payloads)
		time.Sleep(time.Millisecond * 25)
	}

	if err != nil {
		return err
	}

	return errors.New(fmt.Sprintf("listen redis zset [%s] error", s.group.stream))
}

func (s streamD) callback(in chan *Payload) {
	size := len(in)
	complete := make(chan *Payload, size)

	defer func() {
		close(complete)
		for payload := range complete {
			s.group.client.client.ZRem(s.group.stream, payload.buf)
		}
	}()

	var wg sync.WaitGroup
	for i := 0; i < size; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()

			select {
			case payload := <-in:
				if err := s.group.callback(payload); err != nil {
					break
				}
				complete <- payload
			default:

			}
		}(i)
	}

	wg.Wait()
}

func (s streamD) read() (payloads chan *Payload, err error) {
	size := s.group.option.FetchCount
	payloads = make(chan *Payload, size)
	defer close(payloads)

	now := time.Now()
	args := redis.ZRangeBy{
		Min:   "0",
		Max:   strconv.FormatInt(now.UnixMicro(), 10),
		Count: size,
	}
	cmd := s.group.client.client.ZRangeByScoreWithScores(s.group.stream, args)
	if cmd.Err() != nil {
		return nil, cmd.Err()
	}
	result, err := cmd.Result()
	if err != nil {
		return nil, err
	}

	for _, z := range result {
		var p Payload
		if err1 := json.Unmarshal([]byte(z.Member.(string)), &p); err1 != nil {
			s.group.client.client.ZRem(s.group.stream, z.Member.(string))
			continue
		}

		p.buf = z.Member
		p.Stream = s.group.stream

		payloads <- &p
	}

	return
}
